package pfq.demo.rx;

import java.util.ArrayList;
import java.util.List;

import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;

/**
 * 主要演示RxJava操作符的使用
 */
public class Operator {
    public static void main(String[] args) {
        // map：类型变换，此例子将Integer转成为String
        System.out.println("### map ###");
        Observable.just(123).map(new io.reactivex.functions.Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                System.out.println("转换前是Integer：" + integer);
                return String.valueOf(integer);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("转换后是String：" + s);
            }
        });

        // flatMap: 扁平化的类型变换，专门用来解决嵌套问题，可以将一个observable转换成多个observable，将多级任务铺平后，一个一个发射出去，避免for循环的嵌套
        // 其实可以想象一下公司的管理，一级一级的，总经理下面有几个部门经理，部门经理下面有几个员工，而flat（扁平化）后，大家都是员工，没有了分级的概念
        System.out.println("");
        System.out.println("### flatMap ###");
        System.out.println("for循环嵌套处理: ");
        List<List<String>> data = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            List<String> itemData = new ArrayList<>();
            itemData.add("a" + i);
            itemData.add("b" + i);
            data.add(itemData);
        }
        for (List<String> itemList :
                data) {
            for (String itemString :
                    itemList) {
                System.out.print(itemString + " ");
            }
        }
        System.out.println();
        System.out.println("flatMap处理: ");
        Observable.fromIterable(data).flatMap(new Function<List<String>, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(List<String> strings) {
                return Observable.fromIterable(strings);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String o) {
                System.out.print(o + " ");
            }
        });

        System.out.println();
        System.out.println();
        System.out.println("### dispose ###");
        Observable.just(1, 2, 3)
                .doOnDispose(new Action() {
                    @Override
                    public void run() throws Exception {
                        // 当下游取消订阅的时候会收到该回调
                        System.out.print("doOnDispose");
                    }
                }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                // 取消订阅
                d.dispose();
            }

            @Override
            public void onNext(Integer integer) {
                System.out.print("dispose: " + integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

    }
}
